Specific Capabilities
Stop / Resume
Overview
When stop is asked, nats streams will be preserved.
Stop vs Pause:
Pause will keep in temporal an instance of the workflow running and the workloads in k8s up but scaled down to 0 or in a suspended status.
When stop is asked, all k8s workloads will be removed and the temporal workflow instance will be terminated.
When a step in a batch mode is failed, it is possible to resume only from the last failed state.
NOTE: This capability is tightly coupled with stream retention policy. Meaning that a pipeline, past the retention limit, will not be resumeable.
Supported implementations
- Basic Python Runtime
- Basic Java Runtime
Enabling the functionnality
Note:
You cannot delete a "stopped" pipeline. All subsequent run will be made from last "succeded" state.
# stop=true instead of force=true
curl -XDELETE http://localhost:9080/kflows/v1/jobx/jobx/stop=true
# resuming
curl -XPOST -H "Content-Type: application/json" --data-binary @topology.json http://localhost:9080/kflows/v1/jobx/jobx
Stopped status:
[
{
"closeTime": "2025-02-14T10:41:49.36354Z",
"devInsOuts": [],
"execution": {
"workflow_id": "jobx",
"run_id": "f1725055-b425-430e-86c4-5583d4f5d694"
},
"startTime": "2025-02-14T10:41:37.076232Z",
"status": "Stopped", # <====
"type": {
"name": "KFlowDSL"
}
}
]
RAW-Source
In certain scenarios, you may need to implement a custom source to fullfill your needs. The interface to do it is as below:
globalConfig: {...}
- id: myraw-source
type: raw-source # < ====
addr: host.docker.internal:9000
topics: testtopics
image: <myimage:latest>
meta: {...}
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "num_of_lines", "type": "long"},
{"name": "format", "type": "string"},
{"name": "key", "type": "string"}
]
}
out:
- gencsv
...
Code sample
from faker import Faker
from kfbasicpy.runtime.basic import Function, SinkFunction
from kfbasicpy.sink.nats_sink import NatsSink
from kfbasicpy.runtime.config import loadTopology
from time import time
from dataclasses import dataclass, asdict
from asyncio import run
from typing import Any
import logging
logging.basicConfig(
format='[%(name)s]: (%(asctime)s) => %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.INFO,
)
logger = logging.getLogger(__name__)
@dataclass
class FakeData:
randomfield: str
fake = Faker()
t = loadTopology()
def get_source() -> Function:
for f in t.dag:
if len(f.out) > 0:
return f
raise RuntimeError("kflow bad dag generation")
def get_sink() -> Function:
for f in t.dag:
if len(f.out) == 0:
return f
raise RuntimeError("kflow bad dag generation")
def faker_data() -> dict[str, Any]:
return asdict(FakeData(
randomfield=fake.url(),
))
async def nats_client() -> SinkFunction:
return NatsSink.build(get_sink())
async def event_generator() -> None:
limiter: float = 5.0 # seconds
start: float = time()
pub: SinkFunction = await NatsSink.build(get_sink())
while True:
if time() - start >= limiter:
start = time()
data = faker_data()
logger.info(f"writing @{start}: {data}")
await pub.write(
topics=get_sink().topics,
event_dic=faker_data(),
)
BATCH: Dynamic update / Scale UP - DOWN / Pause - Resume / UpdatePolicy
Overview
Dedicated REST API to update a running flow.
- can be used to scale up process function / sinks that are seems as a bottle neck in a running pipeline or down to free-up resources.
- can update a function configuration in case of typos, fine-tuning, hot-fixes
- pause / resume branches of the flow
Supported implementations
- Basic Python Runtime
- Basic Java Runtime
Enabling the functionnality
CURL
yq -ojson $1 | curl -XPOST -H "Content-Type: application/json" --data-binary @- "http://localhost:9080/kflows/update/v1"
PAYLOAD
The payload needs to match the original input:
- same number of functions
- same vertices (same out for each function)
- same function id
Updateable fields:
- image
- imagePullPolicy
- topics
- parallelism
- user
- password
- addr
- meta
globalConfig: {...}
dag:
- id: source
...
out: [p1]
- id: p1
type: raw
parallelism: 5 # scale-up to 5 pods
meta:
update: true # (not mandatory: default false) allow this function to be updated
pause: false # (not mandatory: default false) do not pause
updatePolicy: DropOffset # enable reconsuming from first stream sequence regardless of offset
#updatePolicy: KeepOffset # (default): restart but do not consumes offset already ack
out: [sink]
- id: sink
...
STREAM: Dynamic update / Scale UP - DOWN / Pause - Resume
Overview
Dedicated REST API to update a running flow.
- can be used to scale up process function / sinks that are seems as a bottle neck in a running pipeline or down to free-up resources.
- can update a function configuration in case of typos, fine-tuning, hot-fixes
- pause / resume branches of the flow
Supported implementations
- Basic Python Runtime
- Basic Java Runtime
Enabling the functionnality
CURL
yq -ojson $1 | curl -XPOST -H "Content-Type: application/json" --data-binary @- "http://localhost:9080/kflows/update/v1"
PAYLOAD
The payload needs to match the original input:
- same number of functions
- same vertices (same out for each function)
- same function id
Updateable fields:
- image
- imagePullPolicy
- topics
- parallelism
- user
- password
- addr
- meta
globalConfig: {...}
dag:
- id: source
...
out: [p1]
- id: p1
type: raw
parallelism: 5 # scale-up to 5 pods
meta:
update: true # (not mandatory: default false) allow this function to be updated => required to be true to pause
pause: false # (not mandatory: default false) do not pause
# (not mandatory: default false) do not restart "running" replicas meaning that only newer ones will take into account new configs
# this is good in case you only want to scale-up/down
# note: in case modification impacts the deployment spec (e.g. resources policies), pods will be restarted regardless it's set to true or false
restart: true
out: [sink]
- id: sink
...
Heartbeat for long message processing
Overview
In some cases, out of 100 events, one may take a longer processing time, for valid reasons. By default, a timeout of 120 seconds is given to process function to finish it's transaction.
When the timer reaches the timelimit, the message will be marked as "Failed Processed" and be replayed. This is of course unwanted as it may be "normal" for our use-case to require more than the 120 seconds timemark.
To "bypass" this, we offer an API within the SDK to reset the timer when called, the heartbeat method, available within the process function Context object.
Supported implementations
- Basic Python Runtime
- Basic Java Runtime
Enabling the functionnality
Python
def process(ctx: Context, ...) -> ...:
ctx.heartbeat() # this is a synchronous call
Java
public Record[] invoke(Context ctx, ...) {
ctx.heartbeat() // this is a synchronous call
return ...
}
StreamProcessFunction vs ProcessFunction
Overview
ProcessFunction nature needs the function to return before the basic engine flushes to NatsJS stream.
This can lead to issues like:
- OOM when 1 message produces severals
- "Blocking" (slow down) of pipeline processing when each produced message based on the input has a "microbatching" logic.
StreamProcessFunction capability provides the required api to publish message to the next function in the dag without returning.
Supported implementations
- Basic Python Runtime
Enabling the functionnality
# topology.yaml
[...]
# set env KFLOW_STREAM_PROCESS_FUNCTION to true
# and implement the API or use inline
dag:
- id: list-s3-bucket
type: fs-source
format: list
addr: host.docker.internal:9000
topics: s3a://sas
user: minioadmin
password: minioadmin
meta:
checkpoint: false
out:
- map
- id: map
type: raw
image: registry.artemis.public/kflow-basic-python-runtime:dev
imagePullPolicy: Always
globalConfig:
messageStore:
enabled: true
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta:
container:
env:
- name: KFLOW_STREAM_PROCESS_FUNCTION
value: "true" # < === Set to true
name: process
code: |
from kfbasicpy.function.context import Context
from kfbasicpy.function.function import StreamProcessFunction
from kfbasicpy.function.stream_queue import StreamMsg, StreamQueue
async def process(ctx: Context | None, event_dict: dict[str, Any], q: StreamQueue) -> StreamMsg:
for i in range(0, 1):
print(f'{i} < - duplicate index')
ctx.heartbeat()
event_dict["key"] = "t" * 1024 * 1024 * 5
await q.put(event_dict)
return StreamMsg.OK
out:
- print0
- id: print0
type: raw
image: registry.artemis.public/kflow-basic-python-runtime:dev
imagePullPolicy: Always
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
Message Store
Overview
Data interchange for events is done by NatsJS streams. Based on your Nats cluster configuration, a MaxPayloadSize per message is defined, which is by default 1 MB.
Max recommended payload size should be no more than 8 MB.
Message Store, when activated, enables using AWS S3 as a storage backend alternative for message bigger than authorized Nats cluster configuration, without losing streaming behaviour.
Only message bigger than what is authorized by Nats will transit through AWS S3. Messages meeting Nats server requirement will transit normally, through Nats stream.
Temporary datas are purged when the flow is deleted.
Supported implementations
- Basic Java Runtime
- Basic Python Runtime
Enabling the functionnality
By default, this feature is disabled. You need to explicitly request for it per function and only on function that are to publish message > Nats authorized payload size.
Consumers of message published using the messageStore feature does not require activating messageStore capability.
jobId: jobx
parallelism: 1
globalConfig:
messageStore:
enabled: true # << ====
topics: kflowinternal
user: minioadmin
password: minioadmin
addr: host.docker.internal:9000 # << === dedicated s3 storage
# if maxPayloadSize is not set, kflow will request
# nats server deployment configuration to retrieve maxPayload
#maxPayloadSize: 12345 # in bytes < ===
dag:
- id: list-s3-bucket
type: fs-source
format: list
addr: host.docker.internal:9000
topics: s3a://sas
user: minioadmin
password: minioadmin
meta:
checkpoint: false
out:
- map
- id: map
type: raw
image: registry.artemis.public/kflow-basic-python-runtime:dev
imagePullPolicy: Always
globalConfig:
messageStore:
enabled: true # < === Request to enable messageStore
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta:
container:
env:
- name: KFLOW_STREAM_PROCESS_FUNCTION
value: "true"
name: process
code: |
from kfbasicpy.function.context import Context
from kfbasicpy.function.function import StreamProcessFunction
from kfbasicpy.function.stream_queue import StreamMsg, StreamQueue
async def process(ctx: Context | None, event_dict: dict[str, Any], q: StreamQueue) -> StreamMsg:
for i in range(0, 1):
print(f'{i} < - duplicate index')
ctx.heartbeat()
event_dict["key"] = "t" * 1024 * 1024 * 5
await q.put(event_dict)
return StreamMsg.OK
out:
- print0
# print0 has messageStore disabled
# nonetheless, consumming message to messageStore is
# supported
- id: print0
type: raw
image: registry.artemis.public/kflow-basic-python-runtime:dev
imagePullPolicy: Always
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta:
name: process
code: |
from kfbasicpy.function.context import Context
def process(
ctx: Context,
event: dict[str, Any],
) -> dict[str, Any]:
if ctx is not None:
logger.info(
f"metadata: {ctx.get_downloader_object_metadata()} "
f"path: {ctx.get_downloader_local_object_url()} "
)
from kfbasicpy.function.open_context import OpenContext
import io
buff = io.BytesIO()
import json
data = json.dumps(event)
buff.write(data.encode())
print(buff.tell())
return event
DevMode (StepByStep)
Overview
This mode is intended to be used with Datapipeline UI. Usage through CLI, is not recommended, although possible.
When the dag is executed in this mode, each process function in the graph can be executed independently with the exception of leafs (a.k.a sinks) and sources nodes.
AWS S3 is used as a way to trigger those function.
KFlow status endpoint returns an additional inDir and outDir for each function. Triggering a function is done by calling AWS S3 PutObject into the function associated inDir. The function output will be stored into the outDir.
Below is an example of a jobId = jobx and namespace = jobx
curl -XGET http://localhost:9080/kflows/v1/jobx?id=jobx
Another new endpoint is provided to retrieve in best effort the result(s) of an input. For instance, requesting the result of th executed map0 function using simulated data as input list-s3-bucket of jobId jobx in namespace jobx for file testfile.csv
curl -s -H "Content-Type: application/json" -X POST "http://localhost:9080/kflows/devstore/v1" -d @- <<EOF | jq
{
"cursor": "testfile.csv",
"execFnId": "map0",
"simFnId": "list-s3-bucket",
"namespace": "jobx",
"jobId": "jobx"
}
EOF
Supported Implementations
- Basic Java Runtime
- Basic Python Runtime
Enabling the functionnality
jobId: jobx
parallelism: 1
globalConfig:
devStore:
enabled: true
image: registry.artemis.technique/kflow-sensor:dev
imagePullPolicy: Always
user: minioadmin
password: minioadmin
topics: kflowinternal
addr: host.docker.internal:9000
dag:
- id: list-s3-bucket
type: fs-source
format: list
addr: host.docker.internal:9000
topics: s3a://sas
user: minioadmin
password: minioadmin
meta:
checkpoint: false
out:
- map
- id: map
type: raw
image: registry.artemis.public/kflow-basic-python-runtime:dev
imagePullPolicy: Always
globalConfig:
messageStore:
enabled: true
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta:
container:
env:
- name: KFLOW_STREAM_PROCESS_FUNCTION
value: "true"
name: process
code: |
from kfbasicpy.function.context import Context
from kfbasicpy.function.function import StreamProcessFunction
from kfbasicpy.function.stream_queue import StreamMsg, StreamQueue
async def process(ctx: Context | None, event_dict: dict[str, Any], q: StreamQueue) -> StreamMsg:
for i in range(0, 1):
print(f'{i} < - duplicate index')
ctx.heartbeat()
event_dict["key"] = "t" * 1024 * 1024 * 5
await q.put(event_dict)
return StreamMsg.OK
out:
- print0
- id: print0
type: raw
image: registry.artemis.public/kflow-basic-python-runtime:dev
imagePullPolicy: Always
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta:
name: process
code: |
from kfbasicpy.function.context import Context
def process(
ctx: Context,
event: dict[str, Any],
) -> dict[str, Any]:
if ctx is not None:
logger.info(
f"metadata: {ctx.get_downloader_object_metadata()} "
f"path: {ctx.get_downloader_local_object_url()} "
)
from kfbasicpy.function.open_context import OpenContext
import io
buff = io.BytesIO()
import json
data = json.dumps(event)
buff.write(data.encode())
print(buff.tell())
return event